import type { Config } from './types';

export type ServerSentEventsOptions<TData = unknown> = Omit<
  RequestInit,
  'method'
> &
  Pick<Config, 'method' | 'responseTransformer' | 'responseValidator'> & {
    /**
     * Fetch API implementation. You can use this option to provide a custom
     * fetch instance.
     *
     * @default globalThis.fetch
     */
    fetch?: typeof fetch;
    /**
     * Implementing clients can call request interceptors inside this hook.
     */
    onRequest?: (url: string, init: RequestInit) => Promise<Request>;
    /**
     * Callback invoked when a network or parsing error occurs during streaming.
     *
     * This option applies only if the endpoint returns a stream of events.
     *
     * @param error The error that occurred.
     */
    onSseError?: (error: unknown) => void;
    /**
     * Callback invoked when an event is streamed from the server.
     *
     * This option applies only if the endpoint returns a stream of events.
     *
     * @param event Event streamed from the server.
     * @returns Nothing (void).
     */
    onSseEvent?: (event: StreamEvent<TData>) => void;
    serializedBody?: RequestInit['body'];
    /**
     * Default retry delay in milliseconds.
     *
     * This option applies only if the endpoint returns a stream of events.
     *
     * @default 3000
     */
    sseDefaultRetryDelay?: number;
    /**
     * Maximum number of retry attempts before giving up.
     */
    sseMaxRetryAttempts?: number;
    /**
     * Maximum retry delay in milliseconds.
     *
     * Applies only when exponential backoff is used.
     *
     * This option applies only if the endpoint returns a stream of events.
     *
     * @default 30000
     */
    sseMaxRetryDelay?: number;
    /**
     * Optional sleep function for retry backoff.
     *
     * Defaults to using `setTimeout`.
     */
    sseSleepFn?: (ms: number) => Promise<void>;
    url: string;
  };

export interface StreamEvent<TData = unknown> {
  data: TData;
  event?: string;
  id?: string;
  retry?: number;
}

export type ServerSentEventsResult<
  TData = unknown,
  TReturn = void,
  TNext = unknown,
> = {
  stream: AsyncGenerator<
    TData extends Record<string, unknown> ? TData[keyof TData] : TData,
    TReturn,
    TNext
  >;
};

export const createSseClient = <TData = unknown>({
  onRequest,
  onSseError,
  onSseEvent,
  responseTransformer,
  responseValidator,
  sseDefaultRetryDelay,
  sseMaxRetryAttempts,
  sseMaxRetryDelay,
  sseSleepFn,
  url,
  ...options
}: ServerSentEventsOptions): ServerSentEventsResult<TData> => {
  let lastEventId: string | undefined;

  const sleep =
    sseSleepFn ??
    ((ms: number) => new Promise((resolve) => setTimeout(resolve, ms)));

  const createStream = async function* () {
    let retryDelay: number = sseDefaultRetryDelay ?? 3000;
    let attempt = 0;
    const signal = options.signal ?? new AbortController().signal;

    while (true) {
      if (signal.aborted) break;

      attempt++;

      const headers =
        options.headers instanceof Headers
          ? options.headers
          : new Headers(options.headers as Record<string, string> | undefined);

      if (lastEventId !== undefined) {
        headers.set('Last-Event-ID', lastEventId);
      }

      try {
        const requestInit: RequestInit = {
          redirect: 'follow',
          ...options,
          body: options.serializedBody,
          headers,
          signal,
        };
        let request = new Request(url, requestInit);
        if (onRequest) {
          request = await onRequest(url, requestInit);
        }
        // fetch must be assigned here, otherwise it would throw the error:
        // TypeError: Failed to execute 'fetch' on 'Window': Illegal invocation
        const _fetch = options.fetch ?? globalThis.fetch;
        const response = await _fetch(request);

        if (!response.ok)
          throw new Error(
            `SSE failed: ${response.status} ${response.statusText}`,
          );

        if (!response.body) throw new Error('No body in SSE response');

        const reader = response.body
          .pipeThrough(new TextDecoderStream())
          .getReader();

        let buffer = '';

        const abortHandler = () => {
          try {
            reader.cancel();
          } catch {
            // noop
          }
        };

        signal.addEventListener('abort', abortHandler);

        try {
          while (true) {
            const { done, value } = await reader.read();
            if (done) break;
            buffer += value;

            const chunks = buffer.split('\n\n');
            buffer = chunks.pop() ?? '';

            for (const chunk of chunks) {
              const lines = chunk.split('\n');
              const dataLines: Array<string> = [];
              let eventName: string | undefined;

              for (const line of lines) {
                if (line.startsWith('data:')) {
                  dataLines.push(line.replace(/^data:\s*/, ''));
                } else if (line.startsWith('event:')) {
                  eventName = line.replace(/^event:\s*/, '');
                } else if (line.startsWith('id:')) {
                  lastEventId = line.replace(/^id:\s*/, '');
                } else if (line.startsWith('retry:')) {
                  const parsed = Number.parseInt(
                    line.replace(/^retry:\s*/, ''),
                    10,
                  );
                  if (!Number.isNaN(parsed)) {
                    retryDelay = parsed;
                  }
                }
              }

              let data: unknown;
              let parsedJson = false;

              if (dataLines.length) {
                const rawData = dataLines.join('\n');
                try {
                  data = JSON.parse(rawData);
                  parsedJson = true;
                } catch {
                  data = rawData;
                }
              }

              if (parsedJson) {
                if (responseValidator) {
                  await responseValidator(data);
                }

                if (responseTransformer) {
                  data = await responseTransformer(data);
                }
              }

              onSseEvent?.({
                data,
                event: eventName,
                id: lastEventId,
                retry: retryDelay,
              });

              if (dataLines.length) {
                yield data as any;
              }
            }
          }
        } finally {
          signal.removeEventListener('abort', abortHandler);
          reader.releaseLock();
        }

        break; // exit loop on normal completion
      } catch (error) {
        // connection failed or aborted; retry after delay
        onSseError?.(error);

        if (
          sseMaxRetryAttempts !== undefined &&
          attempt >= sseMaxRetryAttempts
        ) {
          break; // stop after firing error
        }

        // exponential backoff: double retry each attempt, cap at 30s
        const backoff = Math.min(
          retryDelay * 2 ** (attempt - 1),
          sseMaxRetryDelay ?? 30000,
        );
        await sleep(backoff);
      }
    }
  };

  const stream = createStream();

  return { stream };
};
